/*
 * SessionChat.cpp
 *
 * Copyright (C) 2025 by Posit Software, PBC
 *
 * Unless you have received this program directly from Posit Software pursuant
 * to the terms of a commercial license agreement with Posit Software, then
 * this program is licensed to you under the terms of version 3 of the
 * GNU Affero General Public License. This program is distributed WITHOUT
 * ANY EXPRESS OR IMPLIED WARRANTY, INCLUDING THOSE OF NON-INFRINGEMENT,
 * MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Please refer to the
 * AGPL (http://www.gnu.org/licenses/agpl-3.0.txt) for more details.
 *
 */

#include "SessionChat.hpp"
#include "SessionNodeTools.hpp"

#include "chat/ChatConstants.hpp"
#include "chat/ChatTypes.hpp"
#include "chat/ChatLogging.hpp"
#include "chat/ChatInstallation.hpp"
#include "chat/ChatStaticFiles.hpp"

#include <algorithm>
#include <chrono>
#include <map>
#include <queue>
#include <set>
#include <vector>
#include <functional>

#include <boost/thread/mutex.hpp>

#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/regex.hpp>

#include <core/Exec.hpp>
#include <core/FileInfo.hpp>
#include <core/FileSerializer.hpp>
#include <core/http/Request.hpp>
#include <core/http/Response.hpp>
#include <core/http/URL.hpp>
#include <core/http/Util.hpp>
#include <core/system/Process.hpp>
#include <core/system/System.hpp>
#include <core/system/Xdg.hpp>

#include <r/RExec.hpp>
#include <r/ROptions.hpp>
#include <r/RRoutines.hpp>
#include <r/RSexp.hpp>
#include <r/RUtil.hpp>

#include <session/SessionModuleContext.hpp>
#include <session/SessionOptions.hpp>
#include <session/SessionPersistentState.hpp>
#include <session/SessionSourceDatabase.hpp>
#include <session/SessionUrlPorts.hpp>
#include <session/SessionScopes.hpp>
#include <session/prefs/UserPrefs.hpp>

#include "../SessionDirs.hpp"
#include "environment/EnvironmentUtils.hpp"

#include "session-config.h"

// Use a default section of 'chat' for errors / warnings
#ifdef LOG_ERROR
# undef LOG_ERROR
# define LOG_ERROR(error) LOG_ERROR_NAMED("chat", error)
#endif

using namespace rstudio::core;
using namespace rstudio::core::system;
using namespace rstudio::session::module_context;

namespace rstudio {
namespace session {
namespace modules {
namespace chat {

namespace {

// ============================================================================
// Process management
// ============================================================================
PidType s_chatBackendPid = -1;
int s_chatBackendPort = -1;
std::string s_chatBackendUrl;
int s_chatBackendRestartCount = 0;

// ============================================================================
// Suspension blocking
// ============================================================================
static bool s_chatBusy = false;


// Selective imports from chat modules to avoid namespace pollution
namespace chat_constants = rstudio::session::modules::chat::constants;
namespace chat_types = rstudio::session::modules::chat::types;
namespace chat_logging = rstudio::session::modules::chat::logging;
namespace chat_installation = rstudio::session::modules::chat::installation;
namespace chat_staticfiles = rstudio::session::modules::chat::staticfiles;

// Constants used throughout
using chat_constants::kProtocolVersion;
using chat_constants::kMaxQueueSize;
using chat_constants::kMaxBufferSize;
using chat_constants::kMaxDelay;
using chat_constants::kMaxRestartAttempts;
using chat_constants::kPositAiDirName;
using chat_constants::kServerScriptPath;

// Types used throughout
using chat_types::SemanticVersion;

// Logging functions used throughout
using chat_logging::chatLogLevel;
using chat_logging::setChatLogLevel;
using chat_logging::setBackendMinLogLevel;
using chat_logging::getBackendMinLogLevel;
using chat_logging::getLogLevelPriority;
using chat_logging::shouldLogBackendMessage;
using chat_logging::rs_chatSetLogLevel;

// Installation functions used throughout
using chat_installation::locatePositAiInstallation;
using chat_installation::verifyPositAiInstallation;
using chat_installation::getInstalledVersion;

// Static file handler (used once for URI registration)
using chat_staticfiles::handleAIChatRequest;

// Logging functions are now in chat/ChatLogging.hpp/.cpp

// ============================================================================
// Execution Tracking (for cancellation support)
// ============================================================================
// R is single-threaded, so only one execution can be active at a time,
// but we need to track cancelled IDs to handle pre-cancellation of queued requests
boost::mutex s_executionTrackingMutex;
std::string s_currentTrackingId;  // Empty string when not executing
std::set<std::string> s_cancelledTrackingIds;  // TrackingIds that have been cancelled

// ============================================================================
// Streaming Output Notification Queue with Lifecycle Management
// ============================================================================
// Notifications are queued from onConsoleOutput (any thread) and drained
// from onBackgroundProcessing (main thread only) to ensure thread safety
//
// Lifecycle management prevents unbounded growth and stale notifications:
// - Track active executions to filter notifications for completed/cancelled requests
// - Queue size limit prevents memory exhaustion from noisy executions
// - Automatic cleanup when weak_ptr expires (backend died)

struct PendingNotification
{
   std::string trackingId;
   std::string type;  // "stdout" or "stderr"
   std::string content;
   boost::weak_ptr<core::system::ProcessOperations> weakOps;

   PendingNotification(const std::string& id,
                       const std::string& t,
                       const std::string& c,
                       boost::weak_ptr<core::system::ProcessOperations> ops)
      : trackingId(id), type(t), content(c), weakOps(ops)
   {
   }
};

// Global state (all protected by mutex)
static boost::mutex s_notificationQueueMutex;
static std::queue<PendingNotification> s_notificationQueue;
static std::set<std::string> s_activeTrackingIds;  // Active executions

// ============================================================================
// Code Execution Context (for console output capture)
// ============================================================================
class ChatExecContext
{
public:
   ChatExecContext(bool captureOutput,
                   const std::string& trackingId,
                   boost::shared_ptr<core::system::ProcessOperations> ops)
      : captureOutput_(captureOutput),
        trackingId_(trackingId),
        weakOps_(ops),
        lastStdoutFlush_(std::chrono::steady_clock::now()),
        lastStderrFlush_(std::chrono::steady_clock::now())
   {
   }

   ~ChatExecContext()
   {
      // Flush any remaining buffered output
      flushBuffers();
      disconnect();
   }

   void connect()
   {
      connection_ = module_context::events().onConsoleOutput.connect(
          boost::bind(&ChatExecContext::onConsoleOutput, this, _1, _2));
   }

   void disconnect()
   {
      connection_.disconnect();
   }

   void flushBuffers()
   {
      boost::mutex::scoped_lock lock(bufferMutex_);
      flushStdoutLocked();
      flushStderrLocked();
   }

   std::string getOutput() const
   {
      boost::mutex::scoped_lock lock(bufferMutex_);
      return outputBuffer_;
   }

   std::string getError() const
   {
      boost::mutex::scoped_lock lock(bufferMutex_);
      return errorBuffer_;
   }

private:
   void onConsoleOutput(module_context::ConsoleOutputType type,
                        const std::string& output)
   {
      if (!captureOutput_)
         return;

      boost::mutex::scoped_lock lock(bufferMutex_);

      if (type == module_context::ConsoleOutputNormal)
      {
         outputBuffer_ += output;
         stdoutBuffer_ += output;

         auto now = std::chrono::steady_clock::now();
         bool sizeThreshold = stdoutBuffer_.size() >= kMaxBufferSize;
         bool timeThreshold = (now - lastStdoutFlush_) >= kMaxDelay;

         if (sizeThreshold || timeThreshold)
            flushStdoutLocked();
      }
      else
      {
         errorBuffer_ += output;
         stderrBuffer_ += output;

         auto now = std::chrono::steady_clock::now();
         bool sizeThreshold = stderrBuffer_.size() >= kMaxBufferSize;
         bool timeThreshold = (now - lastStderrFlush_) >= kMaxDelay;

         if (sizeThreshold || timeThreshold)
            flushStderrLocked();
      }
   }

   void flushStdoutLocked()
   {
      if (!stdoutBuffer_.empty())
      {
         queueNotification(trackingId_, "stdout", stdoutBuffer_, weakOps_);
         stdoutBuffer_.clear();
         lastStdoutFlush_ = std::chrono::steady_clock::now();
      }
   }

   void flushStderrLocked()
   {
      if (!stderrBuffer_.empty())
      {
         queueNotification(trackingId_, "stderr", stderrBuffer_, weakOps_);
         stderrBuffer_.clear();
         lastStderrFlush_ = std::chrono::steady_clock::now();
      }
   }

   void queueNotification(const std::string& trackingId,
                          const std::string& type,
                          const std::string& content,
                          boost::weak_ptr<core::system::ProcessOperations> weakOps)
   {
      boost::mutex::scoped_lock lock(s_notificationQueueMutex);

      // Enforce queue size limit to prevent unbounded growth
      if (s_notificationQueue.size() >= kMaxQueueSize)
      {
         WLOG("Notification queue full (size={}), dropping oldest notification",
              s_notificationQueue.size());
         s_notificationQueue.pop();
      }

      s_notificationQueue.push(PendingNotification(trackingId, type, content, weakOps));
   }

   bool captureOutput_;
   std::string trackingId_;
   boost::weak_ptr<core::system::ProcessOperations> weakOps_;

   mutable boost::mutex bufferMutex_;
   std::string outputBuffer_;  // Complete accumulated output
   std::string errorBuffer_;   // Complete accumulated error
   std::string stdoutBuffer_;  // Pending stdout to flush
   std::string stderrBuffer_;  // Pending stderr to flush
   std::chrono::steady_clock::time_point lastStdoutFlush_;
   std::chrono::steady_clock::time_point lastStderrFlush_;

   RSTUDIO_BOOST_CONNECTION connection_;
};

// Echo source code with prompts (like evaluate does)
void echoSourceCode(const std::string& code)
{
   std::vector<std::string> lines;
   boost::split(lines, code, boost::is_any_of("\n"));

   for (size_t i = 0; i < lines.size(); i++)
   {
      // Skip trailing empty line from trailing newline
      if (i == lines.size() - 1 && lines[i].empty())
         continue;

      std::string prompt = (i == 0) ? "> " : "+ ";
      module_context::consoleWriteOutput(prompt + lines[i] + "\n");
   }
}

// ============================================================================
// JSON-RPC Message Handling
// ============================================================================

// Buffer for accumulating stdout data from backend
std::string s_backendOutputBuffer;

// Regex for parsing Content-Length header (case-insensitive per HTTP spec)
// Matches only at start of line to avoid false matches in log output
boost::regex s_contentLengthRegex("(?:^|\r\n)Content-Length:\\s*(\\d+)", boost::regex::icase);

// Map of notification method names to handler functions
using NotificationHandler = std::function<void(const json::Object&)>;
std::map<std::string, NotificationHandler> s_notificationHandlers;

// ============================================================================
// Forward Declarations
// ============================================================================
Error startChatBackend();

// ============================================================================
// JSON-RPC Notification Handling
// ============================================================================

void registerNotificationHandler(const std::string& method, NotificationHandler handler)
{
   s_notificationHandlers[method] = handler;
}

void handleNotification(const std::string& method, const json::Object& params)
{
   auto it = s_notificationHandlers.find(method);
   if (it != s_notificationHandlers.end())
   {
      it->second(params);
   }
   else
   {
      WLOG("Unhandled notification method: {}", method);
   }
}

// ============================================================================
// JSON-RPC Request Handling
// ============================================================================

void sendJsonRpcResponse(core::system::ProcessOperations& ops,
                         const json::Value& id,
                         const json::Value& result)
{
   json::Object response;
   response["jsonrpc"] = "2.0";
   response["id"] = id;
   response["result"] = result;

   std::string body = response.write();
   std::string message = fmt::format(
       "Content-Length: {}\r\n\r\n{}",
       body.size(),
       body
   );

   if (chatLogLevel() >= 2)
   {
      DLOG("Sending JSON-RPC response: {}", body);
   }

   Error error = ops.writeToStdin(message, false);
   if (error)
   {
      ELOG("Failed to write JSON-RPC response to backend stdin: {}", error.getMessage());
   }
}

void sendJsonRpcError(core::system::ProcessOperations& ops,
                      const json::Value& id,
                      int code,
                      const std::string& message)
{
   json::Object errorObj;
   errorObj["code"] = code;
   errorObj["message"] = message;

   json::Object response;
   response["jsonrpc"] = "2.0";
   response["id"] = id;
   response["error"] = errorObj;

   std::string body = response.write();
   std::string frameMessage = fmt::format(
       "Content-Length: {}\r\n\r\n{}",
       body.size(),
       body
   );

   if (chatLogLevel() >= 2)
   {
      DLOG("Sending JSON-RPC error response: {}", body);
   }

   Error error = ops.writeToStdin(frameMessage, false);
   if (error)
   {
      ELOG("Failed to write JSON-RPC error response to backend stdin: {}", error.getMessage());
   }
}

void sendStreamingOutput(core::system::ProcessOperations& ops,
                         const std::string& trackingId,
                         const std::string& type,
                         const std::string& content)
{
   json::Object notification;
   notification["jsonrpc"] = "2.0";
   notification["method"] = "runtime/executionOutput";

   json::Object params;
   params["trackingId"] = trackingId;
   params["type"] = type;
   params["content"] = content;
   params["isIncremental"] = true;

   notification["params"] = params;

   std::string body = notification.write();
   std::string message = fmt::format(
       "Content-Length: {}\r\n\r\n{}",
       body.size(),
       body
   );

   if (chatLogLevel() >= 2)
   {
      DLOG("Sending streaming output: trackingId={}, type={}, length={}",
           trackingId, type, content.size());
   }

   // Best-effort send - don't abort execution on failure
   Error error = ops.writeToStdin(message, false);
   if (error)
   {
      ELOG("Failed to send streaming output: {}", error.getMessage());
   }
}

void drainNotificationQueueForExecution(core::system::ProcessOperations& ops,
                                         const std::string& trackingId)
{
   std::vector<PendingNotification> toProcess;

   // Phase 1: Extract notifications to process (hold lock briefly)
   {
      boost::mutex::scoped_lock lock(s_notificationQueueMutex);

      std::queue<PendingNotification> remaining;
      while (!s_notificationQueue.empty())
      {
         PendingNotification notif = s_notificationQueue.front();
         s_notificationQueue.pop();

         if (notif.trackingId == trackingId)
         {
            toProcess.push_back(notif);
         }
         else
         {
            // Preserve notifications from concurrent executions
            remaining.push(notif);
         }
      }

      s_notificationQueue = remaining;
   }

   // Phase 2: Send notifications (no lock held, no race condition)
   for (const auto& notif : toProcess)
   {
      sendStreamingOutput(ops, notif.trackingId, notif.type, notif.content);
   }
}

void registerActiveExecution(const std::string& trackingId)
{
   boost::mutex::scoped_lock lock(s_notificationQueueMutex);
   s_activeTrackingIds.insert(trackingId);
}

void unregisterActiveExecution(const std::string& trackingId)
{
   boost::mutex::scoped_lock lock(s_notificationQueueMutex);
   s_activeTrackingIds.erase(trackingId);
}

void clearNotificationsForExecution(const std::string& trackingId)
{
   boost::mutex::scoped_lock lock(s_notificationQueueMutex);

   std::queue<PendingNotification> remaining;
   while (!s_notificationQueue.empty())
   {
      PendingNotification notif = s_notificationQueue.front();
      s_notificationQueue.pop();

      if (notif.trackingId != trackingId)
      {
         remaining.push(notif);
      }
      // Drop notifications for cancelled trackingId
   }

   s_notificationQueue = remaining;
}

void handleGetActiveSession(core::system::ProcessOperations& ops,
                            const json::Value& requestId)
{
   json::Object result;
   result["language"] = "R";
   result["version"] = module_context::rVersion();
   result["sessionId"] = module_context::activeSession().id();
   result["mode"] = "console";

   DLOG("Handling runtime/getActiveSession request");
   sendJsonRpcResponse(ops, requestId, result);
}

void handleGetDetailedContext(core::system::ProcessOperations& ops,
                               const json::Value& requestId)
{
   DLOG("Handling runtime/getDetailedContext request");

   // Get current time in human-readable format
   // Format: "Friday, October 3, 2025 at 10:32:34 PM CDT"
   std::time_t now = std::time(nullptr);
   std::tm* localTime = std::localtime(&now);
   char dateBuffer[128];
   std::strftime(dateBuffer, sizeof(dateBuffer), "%A, %B %d, %Y at %I:%M:%S %p %Z", localTime);
   std::string currentDate(dateBuffer);

   // Session object
   // TODO: Future work - detect and support Python sessions
   json::Object session;
   session["language"] = "R";
   session["version"] = module_context::rVersion();
   session["mode"] = "console";
   session["sessionId"] = module_context::activeSession().id();

   // Get R workspace variables with smart filtering
   json::Array variablesArray;
   size_t totalVariableCount = 0;
   {
      using namespace rstudio::r::sexp;

      // Note: R_GlobalEnv objects don't need explicit protection as they're
      // already protected by the environment itself. rProtect is available
      // for any intermediate R operations.
      Protect rProtect;
      std::vector<Variable> vars;

      // List ALL variables (including hidden) to get accurate total count
      listEnvironment(R_GlobalEnv,
                      true,   // includeAll - get everything first
                      false,  // includeLastDotValue - we'll filter manually
                      &vars);

      totalVariableCount = vars.size();

      // Smart filtering: exclude functions and hidden variables
      std::vector<Variable> filteredVars;
      filteredVars.reserve(vars.size());

      for (const Variable& var : vars)
      {
         const std::string& name = var.first;
         SEXP varSEXP = var.second;

         // Skip hidden variables (starting with '.')
         if (!name.empty() && name[0] == '.')
            continue;

         // Skip functions (CLOSXP = user functions, SPECIALSXP/BUILTINSXP = built-ins)
         int objType = TYPEOF(varSEXP);
         if (objType == CLOSXP || objType == SPECIALSXP || objType == BUILTINSXP)
            continue;

         filteredVars.push_back(var);
      }

      // OPTIMIZATION: Extract sizes WITHOUT full JSON conversion
      // This is much faster than calling varToJson() on all variables
      struct VarWithSize {
         Variable var;
         int64_t size;
      };

      std::vector<VarWithSize> varsWithSize;
      varsWithSize.reserve(filteredVars.size());

      for (const Variable& var : filteredVars)
      {
         // Use .rs.estimatedObjectSize() which is optimized for:
         // - ALTREP objects (doesn't materialize)
         // - Large objects (returns estimate quickly)
         // - Edge cases (null pointers, etc.)
         SEXP sizeSEXP;
         Error error = r::exec::RFunction(".rs.estimatedObjectSize")
            .addParam(var.second)
            .call(&sizeSEXP, &rProtect);

         int64_t size = 0;
         if (!error && sizeSEXP != R_NilValue)
         {
            if (Rf_isInteger(sizeSEXP) && Rf_length(sizeSEXP) > 0)
            {
               int rSize = INTEGER(sizeSEXP)[0];
               // Check for NA and negative values
               if (rSize != NA_INTEGER && rSize >= 0)
                  size = static_cast<int64_t>(rSize);
            }
            else if (Rf_isReal(sizeSEXP) && Rf_length(sizeSEXP) > 0)
            {
               double rSize = REAL(sizeSEXP)[0];
               // Check for NA/NaN, negative values, and overflow
               if (!ISNA(rSize) && !ISNAN(rSize) && rSize >= 0.0 &&
                   rSize <= static_cast<double>(INT64_MAX))
                  size = static_cast<int64_t>(rSize);
            }
         }

         varsWithSize.push_back(VarWithSize{var, size});
      }

      // Sort by size (largest first)
      std::sort(varsWithSize.begin(),
                varsWithSize.end(),
                [](const VarWithSize& a, const VarWithSize& b)
                {
                   return a.size > b.size;
                });

      // Take top MAX_VARIABLES and convert to JSON
      // Limit to 100 to balance context richness with performance.
      // Most workspaces have <100 data objects; for larger workspaces,
      // showing the largest objects provides the most useful context.
      const size_t MAX_VARIABLES = 100;
      size_t numToInclude = std::min(varsWithSize.size(), MAX_VARIABLES);

      for (size_t i = 0; i < numToInclude; i++)
      {
         json::Value jsonVal = environment::varToJson(R_GlobalEnv, varsWithSize[i].var);

         // Skip variables that fail conversion (though this is rare)
         if (jsonVal.isNull() || !jsonVal.isObject())
         {
            DLOG("Failed to convert variable '{}' to JSON, skipping",
                 varsWithSize[i].var.first);
            continue;
         }

         // Transform to SessionVariable schema (name, type, displayName)
         // The backend expects simplified variables for context, not full details
         json::Object varObj = jsonVal.getObject();
         json::Object simplifiedVar;

         // Extract name (required) - fallback to variable name from pair if missing
         std::string name;
         if (varObj.hasMember("name") && varObj["name"].isString())
            name = varObj["name"].getString();
         else
            name = varsWithSize[i].var.first;
         simplifiedVar["name"] = name;

         // Extract type (required) - fallback to "unknown" if missing
         std::string type;
         if (varObj.hasMember("type") && varObj["type"].isString())
            type = varObj["type"].getString();
         else
            type = "unknown";
         simplifiedVar["type"] = type;

         // TODO: Reevaluate providing a richer displayName in the future.
         // For now, displayName is the same as name for simplicity.
         // Previous implementation attempted to extract description/value:
         //
         // // Extract description and use as displayName (required)
         // // Description contains human-readable summary like "data.frame: 32 obs. of 11 variables"
         // std::string displayName;
         // if (varObj.hasMember("description") && varObj["description"].isString())
         // {
         //    displayName = varObj["description"].getString();
         // }
         //
         // if (displayName.empty())
         // {
         //    // Fallback: use "type: value" format if no description
         //    std::string value;
         //    if (varObj.hasMember("value") && varObj["value"].isString())
         //       value = varObj["value"].getString();
         //
         //    if (value.empty())
         //       displayName = type;
         //    else
         //       displayName = type + ": " + value;
         // }
         simplifiedVar["displayName"] = name;

         variablesArray.push_back(std::move(simplifiedVar));
      }

      // Log filtering results
      if (filteredVars.size() > MAX_VARIABLES)
      {
         DLOG("Variable context: showing {} of {} variables (filtered from {} total)",
              variablesArray.getSize(), filteredVars.size(), totalVariableCount);
      }
      else if (filteredVars.size() < totalVariableCount)
      {
         DLOG("Variable context: showing {} variables (filtered from {} total)",
              variablesArray.getSize(), totalVariableCount);
      }
   }

   // Add metadata about filtering
   json::Object variablesMeta;
   variablesMeta["totalCount"] = static_cast<int>(totalVariableCount);
   variablesMeta["shownCount"] = static_cast<int>(variablesArray.getSize());
   variablesMeta["filtered"] = (variablesArray.getSize() < totalVariableCount);

   session["variables"] = variablesArray;
   session["variablesMeta"] = variablesMeta;

   // PlatformInfo object
   json::Object platformInfo;
   // TODO: Future work - check if active plots exist
   platformInfo["hasPlots"] = false;
   platformInfo["platformVersion"] = RSTUDIO_VERSION;
   platformInfo["currentDate"] = currentDate;

   // Main result
   json::Object result;

   // Enumerate open files from source database
   // This includes all documents currently open in the editor, both saved and unsaved
   json::Array openFilesArray;
   {
      std::vector<boost::shared_ptr<source_database::SourceDocument>> docs;
      Error error = source_database::list(&docs);

      if (error)
      {
         WLOG("Failed to enumerate open files for detailed context: {}", error.getSummary());
         // Continue with empty array rather than failing the entire request
      }
      else
      {
         DLOG("Building open files context from {} documents", docs.size());

         for (const auto& pDoc : docs)
         {
            // Defensive null check
            if (!pDoc)
            {
               WLOG("Encountered null document pointer in source database, skipping");
               continue;
            }

            json::Object fileObj;

            // Construct URI based on document type
            if (pDoc->isUntitled())
            {
               // Use untitled: scheme for unsaved documents (matches Positron/VSCode convention)
               fileObj["uri"] = fmt::format("untitled:{}", pDoc->id());
            }
            else
            {
               std::string docPath = pDoc->path();
               if (docPath.empty())
               {
                  WLOG("Titled document {} has empty path, skipping", pDoc->id());
                  continue;
               }

               // Use file:/// scheme with absolute path for saved documents (RFC 8089)
               FilePath resolvedPath = module_context::resolveAliasedPath(docPath);
               std::string absPath = resolvedPath.getAbsolutePath();

               // Windows: file:///C:/Users/file.txt
               // Linux/macOS: file:///home/user/file.txt

#ifdef _WIN32
               // Normalize path separators on Windows (C:\path -> C:/path)
               std::replace(absPath.begin(), absPath.end(), '\\', '/');
               fileObj["uri"] = fmt::format("file:///{}", absPath);
#else
               fileObj["uri"] = fmt::format("file://{}", absPath);
#endif
            }

            // Document state flags
            fileObj["isModified"] = pDoc->dirty();
            fileObj["isUntitled"] = pDoc->isUntitled();

            // Visibility: documents with relativeOrder > 0 are visible tabs in the editor
            // relativeOrder == 0 means document is loaded but not visible in a tab
            fileObj["isVisible"] = (pDoc->relativeOrder() > 0);

            openFilesArray.push_back(fileObj);
         }

         DLOG("Open files context: returning {} files", openFilesArray.getSize());
      }
   }
   result["openFiles"] = openFilesArray;
   result["session"] = session;
   result["platformInfo"] = platformInfo;

   sendJsonRpcResponse(ops, requestId, result);
}

void handleExecuteCode(core::system::ProcessOperations& ops,
                       const json::Value& requestId,
                       const json::Object& params)
{
   DLOG("Handling runtime/executeCode request");

   // Extract required parameters
   std::string language;
   std::string code;
   std::string trackingId;

   Error error = json::readObject(params,
      "language", language,
      "code", code,
      "trackingId", trackingId);

   if (error)
   {
      sendJsonRpcError(ops, requestId, -32602,
         "Invalid params: " + error.getMessage());
      return;
   }

   // Validate language (only R supported currently)
   if (language != "r")
   {
      sendJsonRpcError(ops, requestId, -32602,
         "Unsupported language: " + language + ". Only 'r' is currently supported.");
      return;
   }

   // Extract optional options object
   json::Object options;
   json::readObject(params, "options", options);  // Ignore error if missing

   bool captureOutput = true;
   bool capturePlot = false;
   int timeout = 30000;

   json::readObject(options, "captureOutput", captureOutput);
   json::readObject(options, "capturePlot", capturePlot);
   json::readObject(options, "timeout", timeout);

   // Check if this request was already cancelled (pre-cancellation of queued request)
   // and register tracking ID for cancellation support
   {
      boost::mutex::scoped_lock lock(s_executionTrackingMutex);

      // Check if already cancelled
      if (s_cancelledTrackingIds.count(trackingId) > 0)
      {
         s_cancelledTrackingIds.erase(trackingId);

         // Return a response indicating cancellation
         json::Object result;
         result["output"] = "";
         result["error"] = "Execution cancelled";
         result["plots"] = json::Array();
         result["executionTime"] = 0;

         sendJsonRpcResponse(ops, requestId, result);
         // Note: No need to unregister here as we never registered (this check is before registration)
         return;
      }

      // Register as current execution
      s_currentTrackingId = trackingId;
   }

   // Register this execution as active to enable notification filtering
   registerActiveExecution(trackingId);

   // Record start time
   auto startTime = std::chrono::steady_clock::now();

   // Create execution context with streaming support
   // Pass shared_ptr for safe access from background thread
   ChatExecContext execContext(captureOutput, trackingId, ops.shared_from_this());
   execContext.connect();

   // Record the display list length before execution so we can detect if
   // plotting actually occurred (to avoid capturing stale plots)
   int displayListLengthBefore = 0;
   if (capturePlot)
   {
      r::sexp::Protect lenProtect;
      SEXP lenSEXP = R_NilValue;
      Error lenError = r::exec::RFunction(".rs.chat.getDisplayListLength").call(
         &lenSEXP, &lenProtect);
      if (!lenError && lenSEXP != R_NilValue && Rf_isInteger(lenSEXP) && Rf_length(lenSEXP) > 0)
         displayListLengthBefore = INTEGER(lenSEXP)[0];
   }

   // Echo source code with prompts (like evaluate does)
   echoSourceCode(code);

   // Evaluate the code. For multi-line code, we parse it into separate expressions
   // and evaluate each one, mimicking REPL behavior where each line is evaluated
   // and visible results are auto-printed. This is necessary because wrapping
   // multi-statement code in withVisible({...}) causes only the last statement's
   // result to be captured, losing intermediate htmlwidgets or other visible results.
   //
   // IMPORTANT ASSUMPTIONS:
   // 1. This assumes complete, well-formed expressions. Incomplete expressions
   //    (e.g., multi-line function definitions without braces on first line) will
   //    fail to parse.
   // 2. Expression-by-expression evaluation matches R REPL behavior but differs
   //    from evaluate::evaluate() for some edge cases.
   // 3. Each expression is evaluated independently in the global environment.
   //
   // Examples:
   //   Supported:     x <- 1; y <- 2; x + y
   //   Supported:     f <- function() { x <- 1; x + 1 }
   //   NOT supported: f <- function()\n{\n  x <- 1\n}  (parse error)
   r::sexp::Protect protect;

   // Parse the code into expressions
   SEXP parsedSEXP = R_NilValue;
   error = r::exec::RFunction("parse")
      .addParam("text", code)
      .addParam("keep.source", false)
      .call(&parsedSEXP, &protect);

   if (error)
   {
      LOG_ERROR_MESSAGE("Failed to parse code: " + error.getMessage());
   }

   // Evaluate each expression with withVisible() to capture visibility
   // Declare these outside the block so they're available for error reporting
   int numExpressions = 0;
   int currentExpressionIndex = 0;
   std::string currentExpressionText;

   if (!error && parsedSEXP != R_NilValue && TYPEOF(parsedSEXP) == EXPRSXP)
   {
      numExpressions = Rf_length(parsedSEXP);

      // Evaluate each expression
      for (int i = 0; i < numExpressions && !error; i++)
      {
         // Check for cancellation before each expression
         {
            boost::mutex::scoped_lock lock(s_executionTrackingMutex);
            if (s_cancelledTrackingIds.count(trackingId) > 0)
            {
               s_cancelledTrackingIds.erase(trackingId);
               error = Error(boost::system::errc::operation_canceled,
                            "Execution cancelled", ERROR_LOCATION);
               break;
            }
         }

         SEXP exprSEXP = VECTOR_ELT(parsedSEXP, i);
         SEXP withVisibleResultSEXP = R_NilValue;

         // Store current expression index for error reporting
         currentExpressionIndex = i;

         // Evaluate this expression with withVisible()
         error = r::exec::RFunction("withVisible")
            .addParam(exprSEXP)
            .call(&withVisibleResultSEXP, &protect);

         if (error)
            break;

         // Extract result and visibility for this expression
         // withVisible() should return a list with two elements: value and visible
         if (withVisibleResultSEXP == R_NilValue)
         {
            // This shouldn't happen - withVisible() returned NULL
            LOG_ERROR_MESSAGE("withVisible() returned NULL for expression " +
               std::to_string(i + 1) + " of " + std::to_string(numExpressions));
            continue;  // Skip to next expression
         }

         if (TYPEOF(withVisibleResultSEXP) != VECSXP)
         {
            // Unexpected type - withVisible() should always return a list
            LOG_ERROR_MESSAGE("withVisible() returned unexpected type " +
               r::sexp::typeAsString(withVisibleResultSEXP) + " (expected list) for expression " +
               std::to_string(i + 1) + " of " + std::to_string(numExpressions));
            continue;  // Skip to next expression
         }

         if (Rf_length(withVisibleResultSEXP) < 2)
         {
            // Malformed result - withVisible() should return 2-element list
            LOG_ERROR_MESSAGE("withVisible() returned list with length " +
               std::to_string(Rf_length(withVisibleResultSEXP)) + " (expected 2) for expression " +
               std::to_string(i + 1) + " of " + std::to_string(numExpressions));
            continue;  // Skip to next expression
         }

         // All validations passed - extract result and visibility
         SEXP exprResult = VECTOR_ELT(withVisibleResultSEXP, 0);
         SEXP visibleSEXP = VECTOR_ELT(withVisibleResultSEXP, 1);

         // Validate visibility flag type and length
         if (visibleSEXP == R_NilValue || TYPEOF(visibleSEXP) != LGLSXP ||
             Rf_length(visibleSEXP) == 0)
         {
            LOG_ERROR_MESSAGE("withVisible() returned invalid visibility flag (NULL, wrong type, or zero-length) for expression " +
               std::to_string(i + 1) + " of " + std::to_string(numExpressions));
            continue;  // Skip printing this result
         }

         int visibleValue = Rf_asLogical(visibleSEXP);
         if (visibleValue == NA_LOGICAL)
         {
            // NA visibility - treat as FALSE but log warning
            LOG_ERROR_MESSAGE("withVisible() returned NA for visibility flag, treating as FALSE for expression " +
               std::to_string(i + 1) + " of " + std::to_string(numExpressions));
            visibleValue = FALSE;
         }

         bool exprVisible = (visibleValue == TRUE);

         // Print visible results immediately (mimics REPL)
         if (exprVisible)
         {
            Error printError = r::exec::RFunction("print")
               .addParam(exprResult)
               .call();

            if (printError)
            {
               // Print errors should be visible to users, not just logged
               std::string printErrorMsg = "Error printing result: " + printError.getMessage();
               LOG_ERROR_MESSAGE(printErrorMsg);

               // Also show to user via console (similar to how R handles print errors)
               std::string userMsg = "Warning: " + printErrorMsg + "\n";
               module_context::events().onConsoleOutput(
                  module_context::ConsoleOutputError, userMsg);
               module_context::consoleWriteError(userMsg);
            }
         }
      }
   }

   // Handle parse/runtime errors
   // We need to both:
   // 1. Fire the onConsoleOutput signal so our callback can capture it
   // 2. Write to console UI via consoleWriteError for user visibility
   if (error)
   {
      std::string errorMsg = error.getMessage();

      // Add expression context for multi-statement code
      if (numExpressions > 1)
      {
         std::string contextPrefix = "Error in expression " +
            std::to_string(currentExpressionIndex + 1) + " of " +
            std::to_string(numExpressions);

         // Try to deparse the failing expression (only on error, avoids S4 issues for successful code)
         // If deparse fails (e.g., S4 objects like ggplot2), we just show expression number without text
         if (currentExpressionIndex >= 0 && parsedSEXP != R_NilValue &&
             TYPEOF(parsedSEXP) == EXPRSXP && currentExpressionIndex < Rf_length(parsedSEXP))
         {
            SEXP failedExprSEXP = VECTOR_ELT(parsedSEXP, currentExpressionIndex);
            SEXP deparsedSEXP = R_NilValue;

            // Try to deparse - if it fails, that's okay, we'll just skip the text
            Error deparseError = r::exec::RFunction("deparse")
               .addParam(failedExprSEXP)
               .addParam("width.cutoff", 80)
               .call(&deparsedSEXP, &protect);

            // Only include expression text if deparse succeeded
            if (!deparseError && deparsedSEXP != R_NilValue &&
                TYPEOF(deparsedSEXP) == STRSXP && Rf_length(deparsedSEXP) > 0)
            {
               std::string exprText = r::sexp::asString(deparsedSEXP);
               // Truncate if multi-line
               size_t newlinePos = exprText.find('\n');
               if (newlinePos != std::string::npos)
                  exprText = exprText.substr(0, newlinePos) + " ...";

               if (!exprText.empty())
                  contextPrefix += " (" + exprText + ")";
            }
         }

         contextPrefix += ": ";

         // Strip existing "Error: " or "Error in " prefix to avoid duplication
         if (errorMsg.find("Error: ") == 0)
            errorMsg = errorMsg.substr(7);
         else if (errorMsg.find("Error in ") == 0)
            errorMsg = errorMsg.substr(9);

         errorMsg = contextPrefix + errorMsg;
      }
      else
      {
         // Single expression - ensure consistent "Error: " prefix
         if (errorMsg.find("Error: ") != 0 && errorMsg.find("Error in ") != 0)
            errorMsg = "Error: " + errorMsg;
      }

      std::string errorOutput = errorMsg + "\n";

      // Fire signal first so callback captures it
      module_context::events().onConsoleOutput(
         module_context::ConsoleOutputError, errorOutput);

      // Also write to console UI
      module_context::consoleWriteError(errorOutput);
   }

   // NOTE: We no longer need to print results here because we print each visible
   // expression immediately as we evaluate them in the loop above. This mimics
   // REPL behavior where each line's visible result is printed before the next
   // line is evaluated.

   // Handle plots if requested
   // NOTE: This only captures the final plot state. If code creates multiple plots
   // (e.g., plot(1); plot(2)), only the last one is captured. This is a known
   // limitation compared to the previous evaluate-based approach which used new_device.
   json::Array plotsArray;
   if (capturePlot)
   {
      // Use R helper for plot capture, passing the display list length from before
      // execution so it can detect if a NEW plot was created (vs. stale plot)
      r::sexp::Protect plotProtect;
      SEXP plotSEXP = R_NilValue;

      r::exec::RFunction captureFunc(".rs.chat.captureCurrentPlot");
      captureFunc.addParam("displayListLengthBefore", displayListLengthBefore);
      Error plotError = captureFunc.call(&plotSEXP, &plotProtect);

      if (!plotError && plotSEXP != R_NilValue && r::sexp::isList(plotSEXP))
      {
         // Extract plot data from R list
         SEXP plotNames = Rf_getAttrib(plotSEXP, R_NamesSymbol);
         if (plotNames != R_NilValue)
         {
            json::Object plotObj;
            int plotLen = Rf_length(plotSEXP);

            for (int j = 0; j < plotLen; j++)
            {
               if (j < Rf_length(plotNames))
               {
                  std::string name = CHAR(STRING_ELT(plotNames, j));
                  SEXP valueSEXP = VECTOR_ELT(plotSEXP, j);

                  if (name == "data" && Rf_isString(valueSEXP) && Rf_length(valueSEXP) > 0)
                  {
                     plotObj["data"] = std::string(CHAR(STRING_ELT(valueSEXP, 0)));
                  }
                  else if (name == "mimeType" && Rf_isString(valueSEXP) && Rf_length(valueSEXP) > 0)
                  {
                     plotObj["mimeType"] = std::string(CHAR(STRING_ELT(valueSEXP, 0)));
                  }
                  else if (name == "width" && Rf_isInteger(valueSEXP) && Rf_length(valueSEXP) > 0)
                  {
                     plotObj["width"] = INTEGER(valueSEXP)[0];
                  }
                  else if (name == "height" && Rf_isInteger(valueSEXP) && Rf_length(valueSEXP) > 0)
                  {
                     plotObj["height"] = INTEGER(valueSEXP)[0];
                  }
               }
            }

            if (plotObj.hasMember("data"))
               plotsArray.push_back(plotObj);
         }
      }
      // Don't fail on plot errors - just continue without plots
   }

   // Flush any remaining buffered output
   // This queues final notifications to the global queue
   execContext.flushBuffers();

   // Synchronously drain THIS execution's notifications before final response
   // This guarantees correct ordering without sleeping or waiting for background processing
   // Handles concurrent executions correctly by only draining matching trackingId
   drainNotificationQueueForExecution(ops, trackingId);

   // Disconnect context
   execContext.disconnect();

   // Clear tracking ID - execution complete
   {
      boost::mutex::scoped_lock lock(s_executionTrackingMutex);
      s_currentTrackingId.clear();
      // Also clean up from cancelled set in case cancel arrived during execution
      s_cancelledTrackingIds.erase(trackingId);
   }

   // Calculate execution time in milliseconds
   auto endTime = std::chrono::steady_clock::now();
   int executionTime = static_cast<int>(
      std::chrono::duration_cast<std::chrono::milliseconds>(
         endTime - startTime).count());

   // Build response (same format as before)
   json::Object result;
   result["output"] = execContext.getOutput();
   result["error"] = execContext.getError();
   result["plots"] = plotsArray;
   result["executionTime"] = executionTime;

   // Unregister this execution - notifications after this point are stale
   unregisterActiveExecution(trackingId);

   // Send successful response
   sendJsonRpcResponse(ops, requestId, result);

   // Fire change detection event to trigger environment refresh
   module_context::events().onDetectChanges(module_context::ChangeSourceRPC);
}

void handleCancelExecution(const json::Object& params)
{
   DLOG("Handling runtime/cancelExecution notification");

   // Extract trackingId
   std::string trackingId;
   Error error = json::readObject(params, "trackingId", trackingId);

   if (error)
   {
      WLOG("runtime/cancelExecution: missing or invalid trackingId");
      return;
   }

   // Add to cancelled set and check if currently executing
   bool shouldInterrupt = false;
   {
      boost::mutex::scoped_lock lock(s_executionTrackingMutex);

      // Always add to cancelled set - handles pre-cancellation of queued requests
      s_cancelledTrackingIds.insert(trackingId);

      // Check if this is currently executing
      shouldInterrupt = !s_currentTrackingId.empty() &&
                        s_currentTrackingId == trackingId;
   }

   // Clear any queued notifications for this cancelled execution
   clearNotificationsForExecution(trackingId);

   if (shouldInterrupt)
   {
      // Set the interrupt flag - this will cause R to throw an interrupt condition
      r::exec::setInterruptsPending(true);
      DLOG("Set interrupt pending for trackingId: {}", trackingId);
   }
   else
   {
      // Not currently executing - pre-cancellation recorded for when it starts
      DLOG("Recorded pre-cancellation for trackingId: {}", trackingId);
   }
}

// ============================================================================
// Workspace File Content Handlers (for databot file operations)
// ============================================================================

// Convert internal document type to standard language ID
// Based on SessionCopilot.cpp:languageIdFromDocument()
std::string languageIdFromDocument(boost::shared_ptr<source_database::SourceDocument> pDoc)
{
   // R files: r_source, r_markdown, quarto_markdown all map to "r"
   if (pDoc->isRMarkdownDocument() || pDoc->isRFile())
      return "r";

   // Special cases for Makefiles and Dockerfiles
   FilePath docPath(pDoc->path());
   std::string name = docPath.getFilename();
   std::string stem = docPath.getStem();
   if (name == "Makefile" || name == "makefile")
      return "makefile";
   else if (stem == "Dockerfile")
      return "dockerfile";

   // Default: lowercase the internal type (cpp → cpp, python → python, etc.)
   return boost::algorithm::to_lower_copy(pDoc->type());
}

// Convert file:// URI to file system path (cross-platform)
std::string uriToPath(const std::string& uri)
{
   std::string filePrefix("file://");
   if (uri.find(filePrefix) == 0)
   {
      // Windows uses file:/// for drive letters (e.g., file:///C:/path)
      // Unix/Mac use file:// (e.g., file:///path)
#ifdef _WIN32
      if (uri.find(filePrefix + "/") == 0)
         filePrefix = filePrefix + "/";
#endif

      std::string path = uri.substr(filePrefix.length());
      path = core::http::util::urlDecode(path);
      path = r::util::fixPath(path);  // Clean up double slashes

      return path;
   }

   // If no file:// prefix, assume it's already a path
   return uri;
}

// Find an open document by document ID
// Used for untitled documents which have no file path
boost::shared_ptr<source_database::SourceDocument> findOpenDocumentById(const std::string& docId)
{
   boost::shared_ptr<source_database::SourceDocument> pDoc(
      new source_database::SourceDocument());
   Error error = source_database::get(docId, pDoc);
   if (error)
   {
      WLOG("Failed to get document by ID {}: {}", docId, error.getMessage());
      return nullptr;
   }
   return pDoc;
}

// Find an open document by file path with proper alias/symlink handling
// Note: Untitled documents (never saved) cannot be matched by path
boost::shared_ptr<source_database::SourceDocument> findOpenDocument(const std::string& path)
{
   if (path.empty())
      return nullptr;  // Cannot match untitled documents by path

   // Method 1: Use source_database::getId() - fast, handles aliased paths
   // This converts the path to RStudio's internal aliased format and looks it up
   FilePath inputPath = module_context::resolveAliasedPath(path);
   std::string id;
   Error error = source_database::getId(inputPath, &id);
   if (!error)
   {
      boost::shared_ptr<source_database::SourceDocument> pDoc(
         new source_database::SourceDocument());
      error = source_database::get(id, pDoc);
      if (!error)
         return pDoc;
   }

   // Method 2: Fallback to path comparison
   // Handles: symlinks (when both exist), deleted files, path aliasing differences
   std::vector<boost::shared_ptr<source_database::SourceDocument>> docs;
   error = source_database::list(&docs);
   if (error)
   {
      WLOG("Failed to list source documents: {}", error.getMessage());
      return nullptr;
   }

   for (const auto& doc : docs)
   {
      if (doc->path().empty())
         continue;  // Skip untitled documents

      FilePath docPath = module_context::resolveAliasedPath(doc->path());

      // Try filesystem-aware comparison first (handles symlinks when both exist)
      if (inputPath.exists() && docPath.exists())
      {
         if (docPath.isEquivalentTo(inputPath))
            return doc;
      }
      else
      {
         // Fallback: string comparison of absolute paths
         // This works when file is open but deleted/moved on disk
         if (docPath.getAbsolutePath() == inputPath.getAbsolutePath())
            return doc;
      }
   }

   return nullptr;
}

// Read file content from editor buffer if open, otherwise from disk
// Handles aliased paths (~/file.R), symlinks, project-relative paths, and untitled documents
void handleReadFileContent(core::system::ProcessOperations& ops,
                           const json::Value& requestId,
                           const json::Object& params)
{
   DLOG("Handling workspace/readFileContent request");

   // Extract URI parameter
   std::string uri;
   Error error = json::readObject(params, "uri", uri);
   if (error)
   {
      sendJsonRpcError(ops, requestId, -32602, "Invalid params: uri required");
      return;
   }

   boost::shared_ptr<source_database::SourceDocument> doc;

   // Check for untitled: URI scheme
   if (boost::starts_with(uri, "untitled:"))
   {
      std::string docId = uri.substr(9);  // Remove "untitled:" prefix
      doc = findOpenDocumentById(docId);
      if (!doc)
      {
         sendJsonRpcError(ops, requestId, -32602, "Untitled document not found: " + uri);
         return;
      }

      DLOG("Found untitled document by ID: {}", docId);
   }
   else
   {
      // Handle file:// URIs
      std::string path = uriToPath(uri);

      if (path.empty())
      {
         sendJsonRpcError(ops, requestId, -32602, "Invalid URI format: " + uri);
         return;
      }

      // Check if file is open in editor
      doc = findOpenDocument(path);

      if (!doc)
      {
         // File not open - read from disk
         std::string content;
         error = core::readStringFromFile(FilePath(path), &content);

         if (error)
         {
            // Check if it's a "not found" error
            if (error.getCode() == boost::system::errc::no_such_file_or_directory)
            {
               sendJsonRpcError(ops, requestId, -32602, "File not found: " + path);
            }
            else if (error.getCode() == boost::system::errc::permission_denied)
            {
               sendJsonRpcError(ops, requestId, -32603, "Permission denied: " + path);
            }
            else
            {
               sendJsonRpcError(ops, requestId, -32603, "Failed to read file: " + path);
            }
            return;
         }

         json::Object result;
         result["content"] = content;
         result["isModified"] = false;
         // No languageId for files not open in editor

         DLOG("Read file content from disk: {}", path);
         sendJsonRpcResponse(ops, requestId, result);
         return;
      }
   }

   // Common logic for open documents (both untitled and file-based)
   json::Object result;
   result["content"] = doc->contents();
   result["isModified"] = doc->dirty();
   result["languageId"] = languageIdFromDocument(doc);

   DLOG("Read file content from editor buffer (modified: {})", doc->dirty());

   sendJsonRpcResponse(ops, requestId, result);
}

// Write content to editor buffer if open, otherwise to disk
// Handles aliased paths (~/file.R), symlinks, project-relative paths, and untitled documents
void handleWriteFileContent(core::system::ProcessOperations& ops,
                            const json::Value& requestId,
                            const json::Object& params)
{
   DLOG("Handling workspace/writeFileContent request");

   // Extract parameters
   std::string uri;
   std::string content;
   Error error = json::readObject(params, "uri", uri);
   if (error)
   {
      sendJsonRpcError(ops, requestId, -32602, "Invalid params: uri required");
      return;
   }

   error = json::readObject(params, "content", content);
   if (error)
   {
      sendJsonRpcError(ops, requestId, -32602, "Invalid params: content required");
      return;
   }

   boost::shared_ptr<source_database::SourceDocument> doc;
   std::string path;  // Only needed for disk writes

   // Check for untitled: URI scheme
   if (boost::starts_with(uri, "untitled:"))
   {
      std::string docId = uri.substr(9);  // Remove "untitled:" prefix
      doc = findOpenDocumentById(docId);
      if (!doc)
      {
         sendJsonRpcError(ops, requestId, -32602, "Untitled document not found: " + uri);
         return;
      }

      DLOG("Found untitled document by ID: {}", docId);
      // path stays empty - untitled documents can't be written to disk
   }
   else
   {
      // Handle file:// URIs
      path = uriToPath(uri);
      if (path.empty())
      {
         sendJsonRpcError(ops, requestId, -32602, "Invalid URI format: " + uri);
         return;
      }

      doc = findOpenDocument(path);
   }

   json::Object result;

   if (doc)
   {
      // File/untitled doc is open - update source database and notify client via editor event

      // 1. Update document contents in source database
      doc->setContents(content);
      doc->setDirty(true);

      // 2. Write to source database with proper event firing
      error = source_database::put(doc);
      if (error)
      {
         sendJsonRpcError(ops, requestId, -32603,
                         "Failed to update source database");
         return;
      }

      // Fire the critical onDocUpdated event (updates search index, code intelligence, etc.)
      source_database::events().onDocUpdated(doc);

      // 3. Calculate document dimensions for replace range
      // Split content into lines to get lastRow and lastCol
      std::vector<std::string> lines;
      boost::split(lines, content, boost::is_any_of("\n"));
      int lastRow = static_cast<int>(lines.size()) - 1;
      int lastCol = lines.empty() ? 0 : static_cast<int>(lines.back().length());

      // 4. Send editor command to replace entire document content
      // This updates the editor buffer with an undo point, without writing to disk
      json::Object eventData;
      eventData["type"] = "replace_ranges";

      json::Object data;
      data["id"] = doc->id();

      // Create range covering entire document: [0, 0, lastRow, lastCol]
      json::Array ranges;
      json::Array range;
      range.push_back(0);           // startRow
      range.push_back(0);           // startCol
      range.push_back(lastRow);     // endRow
      range.push_back(lastCol);     // endCol
      ranges.push_back(range);
      data["ranges"] = ranges;

      // Replacement text
      json::Array text;
      text.push_back(content);
      data["text"] = text;

      eventData["data"] = data;

      // Emit the editor command event
      ClientEvent event(client_events::kEditorCommand, eventData);
      module_context::enqueClientEvent(event);

      result["success"] = true;

      DLOG("Sent editor command to replace document content");
   }
   else
   {
      // File not open - write to disk (only for file:// URIs)
      if (path.empty())
      {
         // This shouldn't happen (untitled docs are always open or error earlier)
         sendJsonRpcError(ops, requestId, -32603, "Cannot write to disk: not a file URI");
         return;
      }

      error = core::writeStringToFile(FilePath(path), content);

      if (error)
      {
         if (error.getCode() == boost::system::errc::permission_denied)
         {
            sendJsonRpcError(ops, requestId, -32603, "Permission denied: " + path);
         }
         else if (error.getCode() == boost::system::errc::no_such_file_or_directory)
         {
            // Parent directory doesn't exist
            FilePath filePath(path);
            std::string parentDir = filePath.getParent().getAbsolutePath();
            sendJsonRpcError(ops, requestId, -32603,
                           "Directory does not exist: " + parentDir);
         }
         else
         {
            sendJsonRpcError(ops, requestId, -32603, "Failed to write file: " + path);
         }
         return;
      }

      result["success"] = true;

      DLOG("Wrote file content to disk: {}", path);
   }

   sendJsonRpcResponse(ops, requestId, result);
}

void handleGetProtocolVersion(core::system::ProcessOperations& ops,
                               const json::Value& requestId,
                               const json::Object& params)
{
   DLOG("Handling protocol/getVersion request");

   // Extract client info for debugging (optional fields)
   std::string clientProtocolVersion;
   std::string clientVersion;

   json::readObject(params, "clientProtocolVersion", clientProtocolVersion);
   json::readObject(params, "clientVersion", clientVersion);

   DLOG("Client protocol version: {}, client version: {}",
        clientProtocolVersion.empty() ? "unknown" : clientProtocolVersion,
        clientVersion.empty() ? "unknown" : clientVersion);

   // Build response
   json::Object result;
   result["protocolVersion"] = kProtocolVersion;
   result["rstudioVersion"] = std::string(RSTUDIO_VERSION);

   sendJsonRpcResponse(ops, requestId, result);
}

void handleRequest(core::system::ProcessOperations& ops,
                   const std::string& method,
                   const json::Value& requestId,
                   const json::Object& params)
{
   if (method == "runtime/getActiveSession")
   {
      handleGetActiveSession(ops, requestId);
   }
   else if (method == "runtime/getDetailedContext")
   {
      handleGetDetailedContext(ops, requestId);
   }
   else if (method == "runtime/executeCode")
   {
      handleExecuteCode(ops, requestId, params);
   }
   else if (method == "protocol/getVersion")
   {
      handleGetProtocolVersion(ops, requestId, params);
   }
   else if (method == "workspace/readFileContent")
   {
      handleReadFileContent(ops, requestId, params);
   }
   else if (method == "workspace/writeFileContent")
   {
      handleWriteFileContent(ops, requestId, params);
   }
   else
   {
      // Unknown method - send JSON-RPC error response
      WLOG("Unknown JSON-RPC request method: {}", method);
      sendJsonRpcError(ops, requestId, -32601, "Method not found");
   }
}

// ============================================================================
// JSON-RPC Message Processing
// ============================================================================

void processBackendMessage(core::system::ProcessOperations& ops,
                           const json::Value& message)
{
   if (!message.isObject())
   {
      WLOG("Invalid JSON-RPC message: not an object");
      return;
   }

   json::Object messageObj = message.getObject();

   // Check for method field (indicates notification or request)
   std::string method;
   Error error = json::readObject(messageObj, "method", method);
   if (!error)  // Success - method field is present
   {
      // This is a notification or request from backend
      json::Object params;
      json::readObject(messageObj, "params", params);

      // Check for 'id' field to distinguish request from notification
      // Per JSON-RPC 2.0 spec:
      // - Request: has 'id' field (value can be string, number, or null)
      // - Notification: 'id' field is absent entirely
      // The VALUE of id doesn't matter - only its presence/absence
      auto it = messageObj.find("id");
      if (it != messageObj.end())
      {
         // 'id' field EXISTS - this is a REQUEST (even if value is null)
         json::Value requestId = messageObj["id"];
         handleRequest(ops, method, requestId, params);
      }
      else
      {
         // 'id' field ABSENT - this is a NOTIFICATION
         handleNotification(method, params);
      }
   }
   else
   {
      // No method field - this would be a response to a request we sent (future implementation)
      if (chatLogLevel() >= 2)
         DLOG("Received response from backend (not yet handled)");
   }
}

void handleLoggerLog(const json::Object& params)
{
   std::string level;
   std::string message;

   if (json::readObject(params, "level", level))  // Returns error if missing
   {
      WLOG("logger/log notification missing 'level' field");
      return;
   }

   if (json::readObject(params, "message", message))  // Returns error if missing
   {
      WLOG("logger/log notification missing 'message' field");
      return;
   }

   // Filter backend logs based on minimum level setting
   if (getLogLevelPriority(level) < getLogLevelPriority(getBackendMinLogLevel()))
   {
      // This log level is below the threshold, skip it
      return;
   }

   // Map backend log levels to RStudio logging macros
   // Use "ai" prefix and include level to distinguish backend logs
   std::string prefixedMessage = fmt::format("[ai] [{}] {}", level, message);

   if (level == "trace")
   {
      TLOG("{}", prefixedMessage);
   }
   else if (level == "debug")
   {
      DLOG("{}", prefixedMessage);
   }
   else if (level == "info")
   {
      ILOG("{}", prefixedMessage);
   }
   else if (level == "warn")
   {
      WLOG("{}", prefixedMessage);
   }
   else if (level == "error")
   {
      ELOG("{}", prefixedMessage);
   }
   else if (level == "fatal")
   {
      ELOG("{}", prefixedMessage);
   }
   else
   {
      // Unknown levels are treated as high priority (see getLogLevelPriority),
      // so log them as errors to ensure visibility
      ELOG("[ai] [{}] {}", level, message);
   }
}

void handleSetBusyStatus(const json::Object& params)
{
   bool busy = false;
   Error error = json::readObject(params, "busy", busy);
   if (error)
   {
      LOG_ERROR(error);
      return;
   }

   // Update our tracking flag
   s_chatBusy = busy;

   // Log for debugging (using chat-specific logging infrastructure)
   DLOG("Chat backend busy status: {}", busy ? "busy" : "idle");
}

void onBackgroundProcessing(bool isIdle)
{
   std::vector<PendingNotification> toProcess;

   // Phase 1: Extract notifications to process (hold lock briefly)
   {
      boost::mutex::scoped_lock lock(s_notificationQueueMutex);

      while (!s_notificationQueue.empty())
      {
         PendingNotification notif = s_notificationQueue.front();
         s_notificationQueue.pop();

         // Only send notifications for active executions
         if (s_activeTrackingIds.find(notif.trackingId) != s_activeTrackingIds.end())
         {
            toProcess.push_back(notif);
         }
         // Notifications for inactive executions are dropped (completed/cancelled)
      }
   }

   // Phase 2: Send notifications (no lock held, no race condition)
   for (const auto& notif : toProcess)
   {
      // Check if ops still valid (backend may have died)
      auto ops = notif.weakOps.lock();
      if (ops)
      {
         sendStreamingOutput(*ops, notif.trackingId, notif.type, notif.content);
      }
      // If ops expired, notification is dropped (backend dead)
   }
}

// ============================================================================
// Update Management
// ============================================================================

// Structure to hold update check state
struct UpdateState
{
   bool updateAvailable;
   bool noCompatibleVersion;
   std::string currentVersion;
   std::string newVersion;
   std::string downloadUrl;
   std::string errorMessage;

   // Installation status tracking
   enum class Status
   {
      Idle,
      Downloading,
      Installing,
      Complete,
      Error
   };
   Status installStatus;
   std::string installMessage;

   UpdateState()
      : updateAvailable(false),
        noCompatibleVersion(false),
        installStatus(Status::Idle)
   {
   }
};

// Global update state
UpdateState s_updateState;
boost::mutex s_updateStateMutex;

// Validate that a URL uses HTTPS protocol
bool isHttpsUrl(const std::string& url)
{
   return boost::starts_with(url, "https://");
}

// Download manifest from URL specified in pai_download_uri preference
Error downloadManifest(json::Object* pManifest)
{
   if (!pManifest)
      return systemError(boost::system::errc::invalid_argument, ERROR_LOCATION);

#ifndef NDEBUG
   // DEBUG builds only: Support local manifest file for testing
   std::string debugManifestPath = core::system::getenv("RSTUDIO_CHAT_DEBUG_MANIFEST");
   if (!debugManifestPath.empty())
   {
      // Validate path ends with manifest.json
      if (!boost::algorithm::ends_with(debugManifestPath, "manifest.json"))
      {
         WLOG("RSTUDIO_CHAT_DEBUG_MANIFEST must end with 'manifest.json', ignoring: {}",
              debugManifestPath);
      }
      else
      {
         FilePath debugManifestFile(debugManifestPath);
         if (debugManifestFile.exists())
         {
            DLOG("DEBUG: Using local manifest file: {}", debugManifestPath);

            // Read and parse JSON from local file
            std::string manifestContent;
            Error error = core::readStringFromFile(debugManifestFile, &manifestContent);
            if (error)
            {
               WLOG("Failed to read debug manifest file: {}", error.getMessage());
               return error;
            }

            // Parse JSON
            json::Value manifestValue;
            if (manifestValue.parse(manifestContent))
            {
               WLOG("Failed to parse debug manifest JSON");
               return systemError(boost::system::errc::protocol_error,
                                 "Invalid JSON in debug manifest",
                                 ERROR_LOCATION);
            }

            if (!manifestValue.isObject())
            {
               return systemError(boost::system::errc::protocol_error,
                                 "Debug manifest must be a JSON object",
                                 ERROR_LOCATION);
            }

            *pManifest = manifestValue.getObject();
            DLOG("Successfully loaded and parsed debug manifest");
            return Success();
         }
         else
         {
            WLOG("RSTUDIO_CHAT_DEBUG_MANIFEST file does not exist, ignoring: {}", debugManifestPath);
         }
      }
   }
#endif

   // Get download URI from preference
   std::string downloadUri = prefs::userPrefs().paiDownloadUri();

   // Trim whitespace from the URL
   boost::algorithm::trim(downloadUri);

   if (downloadUri.empty())
   {
      return systemError(boost::system::errc::operation_not_permitted,
                        "pai_download_uri preference not set",
                        ERROR_LOCATION);
   }

   // Validate HTTPS
   if (!isHttpsUrl(downloadUri))
   {
      WLOG("Manifest download URL must use HTTPS, rejecting: {}", downloadUri);
      return systemError(boost::system::errc::protocol_error,
                        "Manifest URL must use HTTPS protocol",
                        ERROR_LOCATION);
   }

   DLOG("Downloading manifest from: {}", downloadUri);

   // Create temp file for download
   FilePath tempFile = module_context::tempFile("manifest", "json");

   // Use R's download.file() function with timeout protection
   r::exec::RFunction downloadFunc("download.file");
   downloadFunc.addParam("url", downloadUri);
   downloadFunc.addParam("destfile", tempFile.getAbsolutePath());
   downloadFunc.addParam("quiet", true);
   downloadFunc.addParam("method", "libcurl");  // Use libcurl for HTTPS support
   downloadFunc.addParam("timeout", 30);  // 30 second timeout

   Error error = downloadFunc.call();
   if (error)
   {
      WLOG("Failed to download manifest: {}", error.getMessage());
      return error;
   }

   // Read and parse JSON
   std::string manifestContent;
   error = core::readStringFromFile(tempFile, &manifestContent);
   if (error)
   {
      WLOG("Failed to read manifest file: {}", error.getMessage());
      return error;
   }

   // Parse JSON
   json::Value manifestValue;
   if (manifestValue.parse(manifestContent))
   {
      WLOG("Failed to parse manifest JSON");
      return systemError(boost::system::errc::protocol_error,
                        "Invalid JSON in manifest",
                        ERROR_LOCATION);
   }

   if (!manifestValue.isObject())
   {
      return systemError(boost::system::errc::protocol_error,
                        "Manifest must be a JSON object",
                        ERROR_LOCATION);
   }

   *pManifest = manifestValue.getObject();
   DLOG("Successfully downloaded and parsed manifest");

   return Success();
}

// Parse manifest to get package info for current protocol version
// Selects the highest minor version that matches the major version
Error getPackageInfoFromManifest(
    const json::Object& manifest,
    const std::string& protocolVersion,
    std::string* pPackageVersion,
    std::string* pDownloadUrl)
{
   if (!pPackageVersion || !pDownloadUrl)
      return systemError(boost::system::errc::invalid_argument, ERROR_LOCATION);

   // Get "versions" object
   json::Object versions;
   Error error = json::readObject(manifest, "versions", versions);
   if (error)
   {
      WLOG("Manifest missing 'versions' field");
      return error;
   }

   // Parse RStudio's protocol version to get major version
   SemanticVersion rstudioProtocol;
   if (!rstudioProtocol.parse(protocolVersion))
   {
      WLOG("Failed to parse RStudio protocol version: {}", protocolVersion);
      return systemError(boost::system::errc::invalid_argument,
                        "Invalid protocol version format",
                        ERROR_LOCATION);
   }

   DLOG("Looking for compatible protocols with major version {}", rstudioProtocol.major);

   // Find all compatible protocol versions (matching major version)
   // and select the highest minor version
   SemanticVersion bestProtocol;
   std::string bestPackageVersion;
   std::string bestDownloadUrl;
   bool foundCompatible = false;

   for (const auto& entry : versions)
   {
      std::string manifestProtocol = entry.getName();

      // Parse this manifest protocol version
      SemanticVersion manifestProtocolVer;
      if (!manifestProtocolVer.parse(manifestProtocol))
      {
         WLOG("Skipping manifest entry with invalid protocol version: {}", manifestProtocol);
         continue;
      }

      // Check if major version matches
      if (manifestProtocolVer.major != rstudioProtocol.major)
      {
         DLOG("Skipping protocol {} (major version mismatch)", manifestProtocol);
         continue;
      }

      // Extract version info for this protocol
      json::Value versionValue = entry.getValue();
      if (!versionValue.isObject())
      {
         WLOG("Skipping protocol {} (value is not an object)", manifestProtocol);
         continue;
      }

      json::Object versionInfo = versionValue.getObject();

      std::string packageVersion;
      std::string downloadUrl;

      error = json::readObject(versionInfo, "version", packageVersion);
      if (error)
      {
         WLOG("Skipping protocol {} (missing 'version' field)", manifestProtocol);
         continue;
      }

      error = json::readObject(versionInfo, "url", downloadUrl);
      if (error)
      {
         WLOG("Skipping protocol {} (missing 'url' field)", manifestProtocol);
         continue;
      }

      // Validate download URL is HTTPS
      if (!isHttpsUrl(downloadUrl))
      {
         WLOG("Skipping protocol {} (non-HTTPS URL: {})", manifestProtocol, downloadUrl);
         continue;
      }

      // Check if this is the best (highest) protocol version so far
      if (!foundCompatible || manifestProtocolVer > bestProtocol)
      {
         bestProtocol = manifestProtocolVer;
         bestPackageVersion = packageVersion;
         bestDownloadUrl = downloadUrl;
         foundCompatible = true;
         DLOG("Found compatible protocol {}.{} with package version {}",
              manifestProtocolVer.major, manifestProtocolVer.minor, packageVersion);
      }
   }

   if (!foundCompatible)
   {
      WLOG("No compatible protocol found in manifest for major version {}", rstudioProtocol.major);
      return systemError(boost::system::errc::protocol_not_supported,
                        "No compatible protocol version found in manifest",
                        ERROR_LOCATION);
   }

   *pPackageVersion = bestPackageVersion;
   *pDownloadUrl = bestDownloadUrl;

   DLOG("Selected best compatible protocol {}.{}: package version={}, url={}",
        bestProtocol.major, bestProtocol.minor, bestPackageVersion, bestDownloadUrl);

   return Success();
}

// Compare semantic versions and determine if installation is needed
// Returns true if versions differ, enabling both upgrades (available > installed)
// and downgrades (available < installed) when RStudio version changes
// Returns false if versions are identical or if either version fails to parse
bool shouldInstallVersion(
    const std::string& installedVersion,
    const std::string& availableVersion)
{
   SemanticVersion installed, available;

   if (!installed.parse(installedVersion))
   {
      WLOG("Failed to parse installed version: {}", installedVersion);
      return false;
   }

   if (!available.parse(availableVersion))
   {
      WLOG("Failed to parse available version: {}", availableVersion);
      return false;
   }

   return available != installed;
}

// Download package to temp directory
Error downloadPackage(const std::string& url, const FilePath& destPath)
{
   if (!isHttpsUrl(url))
   {
      WLOG("Package download URL must use HTTPS, rejecting: {}", url);
      return systemError(boost::system::errc::protocol_error,
                        "Package download URL must use HTTPS protocol",
                        ERROR_LOCATION);
   }

   DLOG("Downloading package from: {} to: {}", url, destPath.getAbsolutePath());

   // Use R's download.file() function with timeout protection
   r::exec::RFunction downloadFunc("download.file");
   downloadFunc.addParam("url", url);
   downloadFunc.addParam("destfile", destPath.getAbsolutePath());
   downloadFunc.addParam("quiet", false);  // Show progress for user feedback
   downloadFunc.addParam("method", "libcurl");
   downloadFunc.addParam("mode", "wb");  // Binary mode for zip files
   downloadFunc.addParam("timeout", 60);  // 60 second timeout for larger package

   Error error = downloadFunc.call();
   if (error)
   {
      WLOG("Failed to download package: {}", error.getMessage());
      return error;
   }

   // Verify file exists and has content
   if (!destPath.exists() || destPath.getSize() == 0)
   {
      return systemError(boost::system::errc::io_error,
                        "Downloaded file is empty or missing",
                        ERROR_LOCATION);
   }

   DLOG("Successfully downloaded package ({} bytes)", destPath.getSize());
   return Success();
}

// Install package (backup, extract, cleanup)
Error installPackage(const FilePath& packagePath)
{
   FilePath userDataDir = xdg::userDataDir();
   FilePath aiDir = userDataDir.completePath(kPositAiDirName);
   FilePath aiPrevDir = userDataDir.completePath("ai.prev");

   DLOG("Installing package from: {}", packagePath.getAbsolutePath());

   // Step 1: Remove old backup if it exists
   if (aiPrevDir.exists())
   {
      DLOG("Removing old backup directory: {}", aiPrevDir.getAbsolutePath());
      Error error = aiPrevDir.removeIfExists();
      if (error)
      {
         WLOG("Failed to remove old backup: {}", error.getMessage());
         return error;
      }
   }

   // Step 2: Backup current installation if it exists
   if (aiDir.exists())
   {
      DLOG("Backing up current installation to: {}", aiPrevDir.getAbsolutePath());
      Error error = aiDir.move(aiPrevDir);
      if (error)
      {
         WLOG("Failed to backup current installation: {}", error.getMessage());
         return error;
      }
   }

   // Step 3: Create new ai directory
   Error error = aiDir.ensureDirectory();
   if (error)
   {
      WLOG("Failed to create ai directory: {}", error.getMessage());
      // Try to restore backup
      if (aiPrevDir.exists())
      {
         aiPrevDir.move(aiDir);
      }
      return error;
   }

   // Step 4: Extract package using R's unzip()
   DLOG("Extracting package to: {}", aiDir.getAbsolutePath());
   r::exec::RFunction unzipFunc("unzip");
   unzipFunc.addParam("zipfile", packagePath.getAbsolutePath());
   unzipFunc.addParam("exdir", aiDir.getAbsolutePath());

   error = unzipFunc.call();
   if (error)
   {
      WLOG("Failed to extract package: {}", error.getMessage());
      // Clean up partial extraction
      Error cleanupError = aiDir.removeIfExists();
      if (cleanupError)
      {
         ELOG("Failed to clean up failed extraction directory: {}", cleanupError.getMessage());
      }
      // Restore backup
      if (aiPrevDir.exists())
      {
         Error restoreError = aiPrevDir.move(aiDir);
         if (restoreError)
         {
            ELOG("Failed to restore backup after extraction failure: {}", restoreError.getMessage());
         }
      }
      return error;
   }

   // Step 5: Verify installation
   if (!verifyPositAiInstallation(aiDir))
   {
      WLOG("Extracted package failed verification");
      // Clean up invalid extraction
      Error cleanupError = aiDir.removeIfExists();
      if (cleanupError)
      {
         ELOG("Failed to clean up invalid extraction directory: {}", cleanupError.getMessage());
      }
      // Restore backup
      if (aiPrevDir.exists())
      {
         Error restoreError = aiPrevDir.move(aiDir);
         if (restoreError)
         {
            ELOG("Failed to restore backup after verification failure: {}", restoreError.getMessage());
         }
      }
      return systemError(boost::system::errc::io_error,
                        "Extracted package is incomplete or invalid",
                        ERROR_LOCATION);
   }

   // Step 6: Success - remove backup
   if (aiPrevDir.exists())
   {
      DLOG("Installation successful, removing backup");
      Error backupCleanup = aiPrevDir.removeIfExists();
      if (backupCleanup)
      {
         WLOG("Failed to remove backup directory after successful install: {}", backupCleanup.getMessage());
         // Don't fail the installation for this - backup will be cleaned up next time
      }
   }

   DLOG("Package installation complete");
   return Success();
}

// Called during session initialization to check for updates
Error checkForUpdatesOnStartup()
{
   boost::mutex::scoped_lock lock(s_updateStateMutex);

   // Eligibility check: require both preferences
   if (!prefs::userPrefs().pai() || prefs::userPrefs().paiDownloadUri().empty())
   {
      DLOG("Update check skipped: pai preferences not configured");
      return Success();
   }

   DLOG("Checking for updates on startup");

   // Get installed version
   std::string installedVersion = getInstalledVersion();
   if (installedVersion.empty())
   {
      DLOG("No installation found, checking for initial install");
      installedVersion = "0.0.0";
   }

   s_updateState.currentVersion = installedVersion;

   // Download manifest (silent failure)
   json::Object manifest;
   Error error = downloadManifest(&manifest);
   if (error)
   {
      WLOG("Failed to download manifest: {}", error.getMessage());
      // Silent failure - don't block feature usage
      return Success();
   }

   // Get package info for our protocol version
   std::string packageVersion;
   std::string downloadUrl;
   error = getPackageInfoFromManifest(manifest, kProtocolVersion, &packageVersion, &downloadUrl);
   if (error)
   {
      WLOG("Failed to get package info from manifest: {}", error.getMessage());
      WLOG("Error code: {}, Expected: {}", error.getCode(),
           static_cast<int>(boost::system::errc::protocol_not_supported));

      // Check if this is specifically a "protocol not found" error
      if (error.getCode() == boost::system::errc::protocol_not_supported)
      {
         // Protocol version not in manifest - incompatible RStudio version
         s_updateState.noCompatibleVersion = true;
         s_updateState.updateAvailable = false;
      }

      // For other errors (network, parsing, etc), do silent failure as before
      return Success();
   }

   // Compare versions - offer install if versions differ (upgrade or downgrade)
   if (shouldInstallVersion(installedVersion, packageVersion))
   {
      // Determine if this is an upgrade or downgrade
      SemanticVersion installed, available;
      bool isDowngrade = false;

      // These parses should always succeed since shouldInstallVersion validated them
      if (installed.parse(installedVersion) && available.parse(packageVersion))
      {
         isDowngrade = (available < installed);
         DLOG("{} available: {} -> {}",
              isDowngrade ? "Downgrade" : "Update",
              installedVersion, packageVersion);
      }
      else
      {
         // Defensive: this shouldn't happen, but handle gracefully
         WLOG("Version re-parsing failed unexpectedly: {} -> {}",
              installedVersion, packageVersion);
         DLOG("Update available: {} -> {}", installedVersion, packageVersion);
      }

      s_updateState.updateAvailable = true;
      s_updateState.newVersion = packageVersion;
      s_updateState.downloadUrl = downloadUrl;
   }
   else
   {
      DLOG("No update needed (installed: {}, available: {})", installedVersion, packageVersion);
      s_updateState.updateAvailable = false;
   }

   return Success();
}

// ============================================================================
// Port Allocation
// ============================================================================

Error allocatePort(int* pPort)
{
   // Use bind to port 0 and let OS assign
   using boost::asio::ip::tcp;
   boost::asio::io_context ioContext;
   tcp::acceptor acceptor(ioContext);

   tcp::endpoint endpoint(tcp::v4(), 0); // Port 0 = OS assigns
   boost::system::error_code ec;
   acceptor.open(endpoint.protocol(), ec);
   if (ec)
      return systemError(ec, ERROR_LOCATION);

   acceptor.bind(endpoint, ec);
   if (ec)
      return systemError(ec, ERROR_LOCATION);

   *pPort = acceptor.local_endpoint().port();
   acceptor.close();

   return Success();
}

// ============================================================================
// WebSocket URL Construction
// ============================================================================

std::string buildWebSocketUrl(int port)
{
#ifdef RSTUDIO_SERVER
   if (options().programMode() == kSessionProgramModeServer)
   {
      // Build localhost URL
      std::string localhostUrl = "http://127.0.0.1:" +
                                 boost::lexical_cast<std::string>(port);

      DLOG("Building WebSocket URL for port {}, localhost URL: {}", port, localhostUrl);

      // Transform to portmapped path (returns relative path like "p/58fab3e4" or "/p/58fab3e4")
      std::string portmappedPath = url_ports::mapUrlPorts(localhostUrl);

      // Ensure portmapped path starts with /
      if (!portmappedPath.empty() && portmappedPath[0] != '/')
         portmappedPath = "/" + portmappedPath;

      // Remove trailing slash from portmapped path if present
      if (!portmappedPath.empty() && portmappedPath[portmappedPath.length() - 1] == '/')
         portmappedPath = portmappedPath.substr(0, portmappedPath.length() - 1);

      DLOG("Normalized portmapped path: {}", portmappedPath);

      // Get base URL from session
      std::string baseUrl = persistentState().activeClientUrl();
      DLOG("Base URL: {}", baseUrl);

      // Determine WebSocket scheme and extract URL without scheme
      // This preserves the full path including any session prefix (e.g., /s/abc123/)
      std::string wsScheme;
      std::string urlWithoutScheme;

      if (baseUrl.find("https://") == 0)
      {
         wsScheme = "wss://";
         urlWithoutScheme = baseUrl.substr(8); // Remove "https://"
      }
      else if (baseUrl.find("http://") == 0)
      {
         wsScheme = "ws://";
         urlWithoutScheme = baseUrl.substr(7); // Remove "http://"
      }
      else
      {
         // Fallback - shouldn't happen
         WLOG("Unexpected base URL format: {}", baseUrl);
         wsScheme = "ws://";
         urlWithoutScheme = baseUrl;
      }

      // Remove trailing slash from URL if present (portmappedPath will add one)
      if (!urlWithoutScheme.empty() && urlWithoutScheme.back() == '/')
         urlWithoutScheme = urlWithoutScheme.substr(0, urlWithoutScheme.length() - 1);

      // Build complete WebSocket URL preserving any session path from baseUrl
      // Example: wss://hostname:8787/s/abc123/p/58fab3e4/ai-chat
      // The proxy will route this to http://127.0.0.1:{port}/ai-chat/ws
      std::string wsUrl = wsScheme + urlWithoutScheme + portmappedPath + "/ai-chat";
      DLOG("Final WebSocket URL: {}", wsUrl);

      return wsUrl;
   }
#endif

   // Desktop mode: include /ai-chat base path for AIServer routing
   // The client will append /ws to get ws://127.0.0.1:{port}/ai-chat/ws
   std::string desktopUrl = "ws://127.0.0.1:" + boost::lexical_cast<std::string>(port) + "/ai-chat";
   DLOG("Desktop WebSocket URL: {}", desktopUrl);
   return desktopUrl;
}

// ============================================================================
// Process Management
// ============================================================================

void onBackendStdout(core::system::ProcessOperations& ops, const std::string& output)
{
   // Append new output to buffer
   s_backendOutputBuffer.append(output);

   // Process all complete messages in the buffer
   // NOTE: We process messages immediately here (not deferred) because:
   // 1. callbacksRequireMainThread=true means we're already on the main thread
   // 2. We need the valid 'ops' reference to send responses to requests
   while (true)
   {
      // Find the end of headers (blank line: \r\n\r\n) first
      // This ensures we only parse Content-Length from a valid header block
      size_t headerEnd = s_backendOutputBuffer.find("\r\n\r\n");
      if (headerEnd == std::string::npos)
      {
         // Headers not complete yet
         break;
      }

      // Extract header block (everything before \r\n\r\n)
      std::string headerBlock = s_backendOutputBuffer.substr(0, headerEnd);

      // Look for Content-Length header within this header block only
      // This prevents matching stray "Content-Length:" text in non-protocol stdout
      boost::smatch matches;
      if (!boost::regex_search(headerBlock, matches, s_contentLengthRegex))
      {
         // No Content-Length in this header block - malformed JSON-RPC message
         WLOG("JSON-RPC message missing Content-Length header, skipping malformed message");
         // Skip past this malformed message (discard up to and including \r\n\r\n)
         s_backendOutputBuffer = s_backendOutputBuffer.substr(headerEnd + 4);
         continue;
      }

      // Extract content length
      std::string lengthStr = matches[1].str();
      int contentLength = safe_convert::stringTo<int>(lengthStr, -1);
      if (contentLength <= 0)
      {
         WLOG("Invalid Content-Length value in backend message: {}", lengthStr);
         // Skip past this malformed message
         s_backendOutputBuffer = s_backendOutputBuffer.substr(headerEnd + 4);
         continue;
      }

      // Calculate where the body starts and ends
      size_t bodyStart = headerEnd + 4; // Skip past \r\n\r\n
      size_t bodyEnd = bodyStart + contentLength;

      // Check if we have the complete body
      if (s_backendOutputBuffer.size() < bodyEnd)
      {
         // Body not complete yet
         break;
      }

      // Extract the message body (IMPORTANT: byte-based, not character-based)
      std::string messageBody = s_backendOutputBuffer.substr(bodyStart, contentLength);

      // Verbose logging (filtered by backend log level for logger/log notifications)
      if (chatLogLevel() >= 2 && shouldLogBackendMessage(messageBody))
      {
         DLOG("Received message from backend: {}", messageBody);
      }

      // Parse JSON
      json::Value messageValue;
      if (messageValue.parse(messageBody))
      {
         WLOG("Failed to parse JSON from backend: {}", messageBody);
      }
      else
      {
         // Process message immediately while ops is valid
         processBackendMessage(ops, messageValue);
      }

      // Remove the processed message from buffer
      s_backendOutputBuffer = s_backendOutputBuffer.substr(bodyEnd);
   }
}

void onBackendStderr(core::system::ProcessOperations& ops, const std::string& output)
{
   WLOG("Chat backend stderr: {}", output);
}

void onBackendExit(int exitCode)
{
   WLOG("Chat backend exited with code: {}", exitCode);

   // Clear chat backend busy state to prevent stuck suspension blocking
   if (s_chatBusy)
   {
      s_chatBusy = false;
      DLOG("Cleared chat backend busy state on process exit");
   }

   // Clear state
   s_chatBackendPid = -1;
   s_chatBackendPort = -1;
   s_chatBackendUrl.clear();
   s_backendOutputBuffer.clear();
   s_chatBackendRestartCount = kMaxRestartAttempts;

   // Notify client of unexpected backend exit
   json::Object exitData;
   exitData["exit_code"] = exitCode;
   exitData["crashed"] = true;
   module_context::enqueClientEvent(ClientEvent(
      client_events::kChatBackendExit,
      exitData
   ));
}

Error startChatBackend()
{
   // Check if already running
   if (s_chatBackendPid != -1)
      return Success();

   // Locate installation
   FilePath positAiPath = locatePositAiInstallation();
   if (positAiPath.isEmpty())
   {
      std::string userPath = xdg::userDataDir().completePath(kPositAiDirName).getAbsolutePath();
      std::string systemPath = xdg::systemConfigDir().completePath(kPositAiDirName).getAbsolutePath();
      std::string errorMsg = fmt::format(
         "Posit AI installation not found. Install to: {} (user) or {} (system)",
         userPath, systemPath);
      return systemError(boost::system::errc::no_such_file_or_directory,
                        errorMsg,
                        ERROR_LOCATION);
   }

   // Find Node.js
   FilePath nodePath;
   Error error = node_tools::findNode(&nodePath, "rstudio.positAi.nodeBinaryPath");
   if (error)
      return error;

   // Locate backend script
   FilePath serverScript = positAiPath.completeChildPath(kServerScriptPath);
   if (!serverScript.exists())
      return systemError(boost::system::errc::no_such_file_or_directory,
                        "Backend script not found: " + serverScript.getAbsolutePath(),
                        ERROR_LOCATION);

   // Allocate a free port
   error = allocatePort(&s_chatBackendPort);
   if (error)
      return error;

   DLOG("Allocated port {} for chat backend", s_chatBackendPort);

   // Build WebSocket URL BEFORE launching (so it's ready when needed)
   s_chatBackendUrl = buildWebSocketUrl(s_chatBackendPort);

   // Build command arguments
   // Use relative path so Node.js resolves modules from working directory
   std::vector<std::string> args;
   args.push_back(kServerScriptPath);
   args.push_back("-h");
   args.push_back("127.0.0.1");  // Explicitly bind to IPv4 to match client connection
   args.push_back("-p");
   args.push_back(boost::lexical_cast<std::string>(s_chatBackendPort));
   args.push_back("--json"); // Enable JSON-RPC mode

   // Add workspace path argument
   FilePath workspacePath = dirs::getInitialWorkingDirectory();
   args.push_back("--workspace");
   args.push_back(workspacePath.getAbsolutePath());

   // Create storage base path: {XDG_DATA_HOME}/ai-data/
   FilePath storagePath = xdg::userDataDir().completePath("ai-data");
   error = storagePath.ensureDirectory();
   if (error)
      return(error);

   args.push_back("--storage");
   args.push_back(storagePath.getAbsolutePath());

   // Generate a persistent ID for this workspace directory
   std::string workspacePathStr = workspacePath.getAbsolutePath();
   std::string workspaceId = session::projectToProjectId(
       module_context::userScratchPath(),
       FilePath(),  // No shared storage - use per-user workspace IDs
       workspacePathStr
   ).id();

   args.push_back("--workspace-id");
   args.push_back(workspaceId);

   // Set up environment
   core::system::Options environment;
   core::system::environment(&environment);

   // Set up callbacks
   core::system::ProcessCallbacks callbacks;
   callbacks.onStarted = [](core::system::ProcessOperations& ops) {
      s_chatBackendPid = ops.getPid();
      DLOG("Chat backend started with PID: {}", s_chatBackendPid);
   };
   callbacks.onStdout = onBackendStdout;
   callbacks.onStderr = onBackendStderr;
   callbacks.onExit = onBackendExit;

   // Process options
   core::system::ProcessOptions processOpts;
   processOpts.allowParentSuspend = true;
   processOpts.exitWithParent = true;
   processOpts.callbacksRequireMainThread = true;
   processOpts.reportHasSubprocs = false;
#ifndef _WIN32
   processOpts.detachSession = true;
#else
   processOpts.detachProcess = true;
#endif
   processOpts.workingDir = positAiPath;
   processOpts.environment = environment;

   // Log execution details
   std::string argsStr = boost::algorithm::join(args, " ");
   DLOG("Launching chat backend: nodePath={}, args=[{}], workingDir={}, exitWithParent={}",
        nodePath.getAbsolutePath(),
        argsStr,
        processOpts.workingDir.getAbsolutePath(),
        processOpts.exitWithParent);

   // Launch via ProcessSupervisor
   error = processSupervisor().runProgram(
       nodePath.getAbsolutePath(),
       args,
       processOpts,
       callbacks);

   if (error)
   {
      LOG_ERROR(error);
      s_chatBackendPort = -1;
      s_chatBackendUrl.clear();
      return error;
   }

   return Success();
}

// ============================================================================
// RPC Methods
// ============================================================================

Error chatVerifyInstalled(const json::JsonRpcRequest& request,
                          json::JsonRpcResponse* pResponse)
{
   FilePath installation = locatePositAiInstallation();
   bool installed = !installation.isEmpty();
   pResponse->setResult(installed);
   return Success();
}

Error chatStartBackend(const json::JsonRpcRequest& request,
                       json::JsonRpcResponse* pResponse)
{
   Error error = startChatBackend();

   json::Object result;
   result["success"] = !error;
   if (error)
      result["error"] = error.getMessage();

   pResponse->setResult(result);
   return Success();
}

Error chatGetBackendUrl(const json::JsonRpcRequest& request,
                        json::JsonRpcResponse* pResponse)
{
   json::Object result;
   result["url"] = s_chatBackendUrl;
   result["port"] = s_chatBackendPort;
   result["ready"] = (s_chatBackendPid != -1 && !s_chatBackendUrl.empty());

   pResponse->setResult(result);
   return Success();
}

Error chatGetBackendStatus(const json::JsonRpcRequest& request,
                           json::JsonRpcResponse* pResponse)
{
   json::Object result;

   FilePath installation = locatePositAiInstallation();
   if (installation.isEmpty())
   {
      result["status"] = "not_installed";
      result["error"] = "Posit AI not installed.";
   }
   else if (s_chatBackendPid == -1)
   {
      result["status"] = "stopped";
   }
   else if (s_chatBackendUrl.empty())
   {
      result["status"] = "starting";
   }
   else
   {
      result["status"] = "ready";
      result["url"] = s_chatBackendUrl;
   }

   pResponse->setResult(result);
   return Success();
}

Error chatCheckForUpdates(const json::JsonRpcRequest& request,
                          json::JsonRpcResponse* pResponse)
{
   boost::mutex::scoped_lock lock(s_updateStateMutex);

   // Guard: Require both preferences to be set
   if (!prefs::userPrefs().pai() || prefs::userPrefs().paiDownloadUri().empty())
   {
      // Return empty/negative response - don't reveal feature exists
      json::Object result;
      result["updateAvailable"] = false;
      result["noCompatibleVersion"] = false;
      pResponse->setResult(result);
      return Success();
   }

   // Return cached startup check result
   json::Object result;
   result["updateAvailable"] = s_updateState.updateAvailable;
   result["noCompatibleVersion"] = s_updateState.noCompatibleVersion;
   result["currentVersion"] = s_updateState.currentVersion;
   result["newVersion"] = s_updateState.newVersion;
   result["downloadUrl"] = s_updateState.downloadUrl;
   result["isInitialInstall"] = (s_updateState.currentVersion == "0.0.0");

   DLOG("chatCheckForUpdates returning: updateAvailable={}, noCompatibleVersion={}",
        s_updateState.updateAvailable, s_updateState.noCompatibleVersion);

   pResponse->setResult(result);
   return Success();
}

Error chatInstallUpdate(const json::JsonRpcRequest& request,
                        json::JsonRpcResponse* pResponse)
{
   boost::mutex::scoped_lock lock(s_updateStateMutex);

   // Guard: Require both preferences to be set
   if (!prefs::userPrefs().pai() || prefs::userPrefs().paiDownloadUri().empty())
   {
      return systemError(boost::system::errc::operation_not_permitted,
                        "Feature not enabled",
                        ERROR_LOCATION);
   }

   // Check if update is available
   if (!s_updateState.updateAvailable)
   {
      return systemError(boost::system::errc::operation_not_permitted,
                        "No update available",
                        ERROR_LOCATION);
   }

   // Check if already in progress
   if (s_updateState.installStatus != UpdateState::Status::Idle &&
       s_updateState.installStatus != UpdateState::Status::Complete &&
       s_updateState.installStatus != UpdateState::Status::Error)
   {
      return systemError(boost::system::errc::operation_in_progress,
                        "Update already in progress",
                        ERROR_LOCATION);
   }

   // Start update process (async would be better, but doing sync for simplicity)
   s_updateState.installStatus = UpdateState::Status::Downloading;
   s_updateState.installMessage = "Downloading update...";

   // Unlock mutex during download/install to allow status queries
   lock.unlock();

   // Stop backend if running
   if (s_chatBackendPid != -1)
   {
      DLOG("Stopping backend for update");
      Error error = core::system::terminateProcess(s_chatBackendPid);
      if (error)
      {
         WLOG("Failed to stop backend: {}", error.getMessage());
      }
      s_chatBackendPid = -1;
      s_chatBackendPort = -1;
      s_chatBackendUrl.clear();
   }

   // Download package
   FilePath tempPackage = module_context::tempFile("pai-update", "zip");
   Error error = downloadPackage(s_updateState.downloadUrl, tempPackage);

   if (error)
   {
      boost::mutex::scoped_lock lock2(s_updateStateMutex);
      s_updateState.installStatus = UpdateState::Status::Error;
      s_updateState.installMessage = "Download failed: " + error.getMessage();

      // Clean up temp file
      Error cleanupError = tempPackage.removeIfExists();
      if (cleanupError)
         WLOG("Failed to remove temp package after download failure: {}", cleanupError.getMessage());

      return error;
   }

   // Install package
   {
      boost::mutex::scoped_lock lock2(s_updateStateMutex);
      s_updateState.installStatus = UpdateState::Status::Installing;
      s_updateState.installMessage = "Installing update...";
   }

   error = installPackage(tempPackage);

   // Always clean up temp file (do this before error handling)
   Error cleanupError = tempPackage.removeIfExists();
   if (cleanupError)
   {
      WLOG("Failed to remove temp package: {}", cleanupError.getMessage());
   }

   if (error)
   {
      boost::mutex::scoped_lock lock2(s_updateStateMutex);
      s_updateState.installStatus = UpdateState::Status::Error;
      s_updateState.installMessage = "Installation failed: " + error.getMessage();

      // Note: installPackage() already handles backup restoration internally,
      // so we don't need to call restoreFromBackup() here again.
      // Just verify backup was restored and clean up if needed.
      FilePath userDataDir = xdg::userDataDir();
      FilePath aiPrevDir = userDataDir.completePath("ai.prev");

      // Defensive cleanup: remove orphaned backup if it exists
      if (aiPrevDir.exists())
      {
         Error prevCleanup = aiPrevDir.removeIfExists();
         if (prevCleanup)
            WLOG("Failed to clean up backup directory after failed install: {}", prevCleanup.getMessage());
      }

      return error;
   }

   // Success - ensure backup is cleaned up
   {
      boost::mutex::scoped_lock lock2(s_updateStateMutex);

      // Defensive cleanup: ensure ai.prev is removed
      FilePath userDataDir = xdg::userDataDir();
      FilePath aiPrevDir = userDataDir.completePath("ai.prev");
      if (aiPrevDir.exists())
      {
         WLOG("Backup directory still exists after successful install, cleaning up");
         Error prevCleanup = aiPrevDir.removeIfExists();
         if (prevCleanup)
            WLOG("Failed to clean up backup directory: {}", prevCleanup.getMessage());
      }

      s_updateState.installStatus = UpdateState::Status::Complete;
      s_updateState.installMessage = "Update complete";
      s_updateState.updateAvailable = false;  // Clear update flag
      s_updateState.currentVersion = s_updateState.newVersion;
   }

   pResponse->setResult(json::Value());
   return Success();
}

Error chatGetUpdateStatus(const json::JsonRpcRequest& request,
                          json::JsonRpcResponse* pResponse)
{
   boost::mutex::scoped_lock lock(s_updateStateMutex);

   // Guard: Require both preferences to be set
   if (!prefs::userPrefs().pai() || prefs::userPrefs().paiDownloadUri().empty())
   {
      // Return idle status - don't reveal feature exists
      json::Object result;
      result["status"] = "idle";
      pResponse->setResult(result);
      return Success();
   }

   json::Object result;

   // Map status enum to string
   switch (s_updateState.installStatus)
   {
      case UpdateState::Status::Idle:
         result["status"] = "idle";
         break;
      case UpdateState::Status::Downloading:
         result["status"] = "downloading";
         break;
      case UpdateState::Status::Installing:
         result["status"] = "installing";
         break;
      case UpdateState::Status::Complete:
         result["status"] = "complete";
         break;
      case UpdateState::Status::Error:
         result["status"] = "error";
         break;
   }

   result["message"] = s_updateState.installMessage;
   if (s_updateState.installStatus == UpdateState::Status::Error)
   {
      result["error"] = s_updateState.errorMessage;
   }

   pResponse->setResult(result);
   return Success();
}

// ============================================================================
// Module Lifecycle
// ============================================================================

void onSuspend(const r::session::RSuspendOptions& options, Settings* pSettings)
{
   DLOG("Session suspension starting - terminating chat backend");

   // Persist whether backend was running before suspension
   bool wasRunning = (s_chatBackendPid != -1);
   pSettings->set("chat_suspended", wasRunning);

   // Terminate backend if running
   if (wasRunning)
   {
      Error error = core::system::terminateProcess(s_chatBackendPid);
      if (error)
         LOG_ERROR(error);

      // Clear state (rsession is exiting anyway)
      s_chatBackendPid = -1;
      s_chatBackendPort = -1;
      s_chatBackendUrl.clear();
   }

   // Clear busy state and JSON-RPC buffer
   s_chatBusy = false;
   s_backendOutputBuffer.clear();
}

void onResume(const Settings& settings)
{
   DLOG("Session resuming");

   // Check if we were suspended with chat backend running
   bool wasSuspended = settings.getBool("chat_suspended", false);

   if (wasSuspended)
   {
      DLOG("Restarting chat backend after resume");

      Error error = startChatBackend();
      if (error)
         LOG_ERROR(error);
   }
}

void onShutdown(bool terminatedNormally)
{
   // Clear busy state
   if (s_chatBusy)
   {
      s_chatBusy = false;
   }

   // Terminate backend process
   if (s_chatBackendPid != -1)
   {
      DLOG("Terminating chat backend process");
      Error error = core::system::terminateProcess(s_chatBackendPid);
      if (error)
         LOG_ERROR(error);
      s_chatBackendPid = -1;
   }

   // Clear port and URL
   s_chatBackendPort = -1;
   s_chatBackendUrl.clear();

   // Clear JSON-RPC state
   s_backendOutputBuffer.clear();

   // Clear notification queue
   {
      boost::mutex::scoped_lock lock(s_notificationQueueMutex);
      while (!s_notificationQueue.empty())
         s_notificationQueue.pop();
      s_activeTrackingIds.clear();
   }
}

} // end anonymous namespace

// ============================================================================
// Public API
// ============================================================================

bool isSuspendable()
{
   // Session can suspend if chat backend is NOT busy
   return !s_chatBusy;
}

// ============================================================================
// Module Initialization
// ============================================================================

Error initialize()
{
   using boost::bind;
   using namespace module_context;

   // Read default log level
   std::string chatLogLevelStr = core::system::getenv("CHAT_LOG_LEVEL");
   if (!chatLogLevelStr.empty())
      setChatLogLevel(safe_convert::stringTo<int>(chatLogLevelStr, 0));

   // Read backend minimum log level filter
   std::string backendMinLevel = core::system::getenv("CHAT_BACKEND_MIN_LEVEL");
   if (!backendMinLevel.empty())
   {
      // Convert to lowercase for case-insensitive matching
      boost::algorithm::to_lower(backendMinLevel);

      // Validate it's a known level, otherwise keep default
      if (backendMinLevel == "trace" || backendMinLevel == "debug" ||
          backendMinLevel == "info" || backendMinLevel == "warn" ||
          backendMinLevel == "error" || backendMinLevel == "fatal")
      {
         setBackendMinLogLevel(backendMinLevel);
      }
      else
      {
         WLOG("Invalid CHAT_BACKEND_MIN_LEVEL value '{}', using default 'error'", backendMinLevel);
      }
   }

   RS_REGISTER_CALL_METHOD(rs_chatSetLogLevel);

   // Register JSON-RPC notification handlers
   registerNotificationHandler("logger/log", handleLoggerLog);
   registerNotificationHandler("chat/setBusyStatus", handleSetBusyStatus);
   registerNotificationHandler("runtime/cancelExecution", handleCancelExecution);

   // Register event handlers
   events().onBackgroundProcessing.connect(onBackgroundProcessing);
   events().onShutdown.connect(onShutdown);

   // Register suspend/resume handlers
   addSuspendHandler(SuspendHandler(
      boost::bind(onSuspend, _1, _2),
      onResume
   ));

   // Register RPC methods
   ExecBlock initBlock;
   initBlock.addFunctions()
      (bind(registerRpcMethod, "chat_verify_installed", chatVerifyInstalled))
      (bind(registerRpcMethod, "chat_start_backend", chatStartBackend))
      (bind(registerRpcMethod, "chat_get_backend_url", chatGetBackendUrl))
      (bind(registerRpcMethod, "chat_get_backend_status", chatGetBackendStatus))
      (bind(registerRpcMethod, "chat_check_for_updates", chatCheckForUpdates))
      (bind(registerRpcMethod, "chat_install_update", chatInstallUpdate))
      (bind(registerRpcMethod, "chat_get_update_status", chatGetUpdateStatus))
      (bind(registerUriHandler, "/ai-chat", handleAIChatRequest))
      (bind(sourceModuleRFile, "SessionChat.R"))
      ;

   Error error = initBlock.execute();
   if (error)
   {
      LOG_ERROR(error);
      return error;
   }

   // Check for updates on startup (async, won't block initialization)
   error = checkForUpdatesOnStartup();
   if (error)
   {
      // Log but don't fail initialization
      WLOG("Update check failed: {}", error.getMessage());
   }

   DLOG("SessionChat module initialized successfully, URI handler registered for /ai-chat");
   return Success();
}

} // end namespace chat
} // end namespace modules
} // end namespace session
} // end namespace rstudio
